package com.huan.table

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object The_query {
  def main(args: Array[String]): Unit = {
    //创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv = StreamTableEnvironment.create(env)
    //获取路径
    val filePath = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt"

    tableEnv.connect(new FileSystem().path(filePath)) //定义来源
      .withFormat(new Csv()) //设置格式CSV
      .withSchema(new Schema() //设置表结构
      .field("id", DataTypes.STRING())
      .field("timestamp",DataTypes.BIGINT())
      .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable") //创建临时表

    //TODO 查询操作
    //1.使用TableApi
    val sensorTable = tableEnv.from("inputTable")
    val resultTable = sensorTable
      .select('id,'temperature)
      .filter('id === "sensor_1")


    //2.使用SQL
    val resultSQLTable = tableEnv.sqlQuery(
      """
        |select id,temperature from inputTable
        |where id = 'sensor_10'
        |""".stripMargin)

    resultTable.toAppendStream[(String,Double)].print("TableApi")
    resultSQLTable.toAppendStream[(String,Double)].print("SQL")

    env.execute("The_query")
  }
}
